teg 深度理解AWS SQS与SNS:从原理到生产实战

葫芦的运维日志

浏览量 16 2025/11/25 15:19

深度理解 AWS SQS 与 SNS:从原理到生产实战

用了十几年消息队列,从 ActiveMQ 到 RabbitMQ 到 Kafka,再到 AWS 的 SQS/SNS,我最大的感受是:消息系统看起来简单,真正用好需要对底层机制有深刻理解。这篇文章不讲入门操作,专门讲那些文档里不会告诉你、但在生产环境中会让你半夜被叫醒的东西。

一、SQS 的本质:你以为的队列和它实际的样子

很多人把 SQS 当成一个简单的 FIFO 管道来用,这是第一个认知错误。SQS Standard Queue 本质上是一个分布式的消息存储系统,它的设计目标是无限吞吐极高可用,为此牺牲了严格有序和精确一次投递。

1.1 Standard Queue 的内部架构

SQS Standard Queue 在 AWS 内部是多个服务器组成的集群,消息写入时会被复制到多个节点。这意味着:

  • 消息可能被投递多次:因为分布式复制,同一条消息可能存在于多个节点,不同的消费者可能从不同节点拉到同一条消息。这不是 bug,是设计。
  • 消息顺序不保证:消息分散在多个节点上,消费者拉取时从不同节点获取,自然无法保证全局顺序。
  • 吞吐几乎无限:正因为是分布式的,没有单点瓶颈,每秒处理几万甚至几十万条消息都不是问题。
# 生产环境必须处理重复消息 — 幂等性设计
import hashlib
import boto3
import redis

sqs = boto3.client('sqs')
r = redis.Redis()

def process_message(msg):
    # 用消息内容生成唯一指纹
    body = msg['Body']
    msg_hash = hashlib.sha256(body.encode()).hexdigest()
    
    # Redis SETNX 实现幂等:处理过的消息直接跳过
    lock_key = f"sqs:processed:{msg_hash}"
    if not r.set(lock_key, "1", nx=True, ex=86400):
        print(f"重复消息,跳过: {msg_hash[:16]}")
        return True  # 返回 True 表示可以删除
    
    try:
        # 实际业务处理
        do_business_logic(body)
        return True
    except Exception as e:
        # 处理失败,删除幂等锁,允许重试
        r.delete(lock_key)
        raise

1.2 FIFO Queue 的代价

FIFO Queue 解决了顺序和精确一次的问题,但代价很大:

  • 吞吐限制:每个 Message Group ID 每秒最多 300 条消息(开启高吞吐模式可到 3000 条/秒,但有条件)
  • 高吞吐模式的陷阱:开启后,同一个 Message Group 内的消息在批量发送时可能不严格有序。也就是说,你为了吞吐牺牲了 FIFO 最核心的卖点
  • 成本更高:FIFO 队列的请求费用是 Standard 的约 1.2 倍
# FIFO Queue 的正确使用方式:按业务实体分组
import json
import uuid

def send_order_event(sqs_client, queue_url, order_id, event_type, data):
    """
    关键:MessageGroupId 按 order_id 分组
    同一个订单的事件严格有序,不同订单之间并行处理
    """
    return sqs_client.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps({
            'order_id': order_id,
            'event_type': event_type,
            'data': data,
            'timestamp': int(time.time() * 1000)
        }),
        MessageGroupId=str(order_id),  # 同一订单的事件有序
        MessageDeduplicationId=str(uuid.uuid4())  # 或用内容去重
    )

# 错误示范:所有消息用同一个 GroupId
# 这会导致整个队列变成单线程,吞吐暴跌
# MessageGroupId="all-messages"  # 千万别这么干

1.3 我的选型建议

经过这些年的实践,我的选型原则很简单:

场景 选择 原因
异步任务(发邮件、生成报表) Standard Queue 不需要顺序,重复处理也无害
订单状态流转 FIFO Queue 同一订单的状态变更必须有序
日志收集 Standard Queue 丢一条无所谓,吞吐是关键
支付回调 FIFO Queue + 幂等 不能重复扣款,顺序也重要
每秒超过 3000 条且需要有序 Kinesis 或 Kafka SQS FIFO 扛不住

二、Visibility Timeout — 最容易出事的配置

Visibility Timeout 是 SQS 最核心也最容易配错的参数。消费者拉取消息后,这条消息对其他消费者"隐藏"一段时间,这就是 Visibility Timeout。如果在这段时间内消费者没有删除消息,消息会重新变为可见,被其他消费者再次拉取。

2.1 设太短:消息被重复处理

假设你的业务处理需要 30 秒,但 Visibility Timeout 设了 15 秒。消息处理到一半,就被另一个消费者拉走了,两个消费者同时处理同一条消息。这在支付场景下就是灾难。

2.2 设太长:故障恢复慢

如果消费者进程崩溃了,消息要等到 Visibility Timeout 过期才能被重新处理。设了 12 小时?那这条消息就要等 12 小时。

2.3 正确做法:动态延长

import threading
import boto3

class SQSHeartbeat:
    """
    消息心跳机制:处理过程中持续延长 Visibility Timeout
    类似于分布式锁的续期机制
    """
    def __init__(self, sqs_client, queue_url, receipt_handle, 
                 timeout=30, interval=10):
        self.sqs = sqs_client
        self.queue_url = queue_url
        self.receipt_handle = receipt_handle
        self.timeout = timeout
        self.interval = interval
        self._stop = threading.Event()
        self._thread = None
    
    def start(self):
        self._thread = threading.Thread(target=self._heartbeat, daemon=True)
        self._thread.start()
        return self
    
    def _heartbeat(self):
        while not self._stop.wait(self.interval):
            try:
                self.sqs.change_message_visibility(
                    QueueUrl=self.queue_url,
                    ReceiptHandle=self.receipt_handle,
                    VisibilityTimeout=self.timeout
                )
            except Exception as e:
                print(f"心跳续期失败: {e}")
                break
    
    def stop(self):
        self._stop.set()
        if self._thread:
            self._thread.join(timeout=5)

# 使用方式
def process_with_heartbeat(sqs, queue_url, message):
    heartbeat = SQSHeartbeat(sqs, queue_url, message['ReceiptHandle'])
    heartbeat.start()
    try:
        # 即使处理 5 分钟也不会超时
        result = long_running_task(message['Body'])
        sqs.delete_message(
            QueueUrl=queue_url,
            ReceiptHandle=message['ReceiptHandle']
        )
        return result
    finally:
        heartbeat.stop()

经验法则:Visibility Timeout 设为预期处理时间的 6 倍,同时用心跳机制兜底。比如预期 10 秒处理完,Timeout 设 60 秒,心跳每 20 秒续一次。

三、Dead Letter Queue — 不是垃圾桶,是救命稻草

DLQ(死信队列)是消息处理失败后的最后一道防线。消息被消费指定次数(maxReceiveCount)后仍未被删除,就会被转移到 DLQ。很多团队配了 DLQ 就不管了,这是大错特错。

3.1 DLQ 的正确运维姿势

import json
import boto3
from datetime import datetime

class DLQMonitor:
    """
    DLQ 监控与自动重驱动
    生产环境必须有 DLQ 告警 + 定期巡检
    """
    def __init__(self, region='ap-southeast-1'):
        self.sqs = boto3.client('sqs', region_name=region)
        self.cloudwatch = boto3.client('cloudwatch', region_name=region)
    
    def get_dlq_depth(self, dlq_url):
        """获取 DLQ 中积压的消息数"""
        resp = self.sqs.get_queue_attributes(
            QueueUrl=dlq_url,
            AttributeNames=['ApproximateNumberOfMessages',
                          'ApproximateNumberOfMessagesNotVisible']
        )
        attrs = resp['Attributes']
        visible = int(attrs['ApproximateNumberOfMessages'])
        in_flight = int(attrs['ApproximateNumberOfMessagesNotVisible'])
        return visible + in_flight
    
    def redrive_messages(self, dlq_url, target_url, batch_size=10, max_messages=100):
        """
        将 DLQ 中的消息重新驱动到原始队列
        注意:要先修复导致失败的 bug,再重驱动
        """
        redrived = 0
        while redrived < max_messages:
            resp = self.sqs.receive_message(
                QueueUrl=dlq_url,
                MaxNumberOfMessages=min(batch_size, max_messages - redrived),
                WaitTimeSeconds=1
            )
            messages = resp.get('Messages', [])
            if not messages:
                break
            
            for msg in messages:
                # 发送到原始队列
                self.sqs.send_message(
                    QueueUrl=target_url,
                    MessageBody=msg['Body'],
                    MessageAttributes=msg.get('MessageAttributes', {})
                )
                # 从 DLQ 删除
                self.sqs.delete_message(
                    QueueUrl=dlq_url,
                    ReceiptHandle=msg['ReceiptHandle']
                )
                redrived += 1
        
        print(f"重驱动完成: {redrived} 条消息")
        return redrived

3.2 DLQ 配置的关键参数

{
  "RedrivePolicy": {
    "deadLetterTargetArn": "arn:aws:sqs:ap-southeast-1:123456789:my-queue-dlq",
    "maxReceiveCount": 3
  }
}

maxReceiveCount 怎么设?

  • 设 1:消息失败一次就进 DLQ。适合对延迟敏感、有完善人工处理流程的场景
  • 设 3-5:给瞬时故障(网络抖动、下游短暂不可用)自动恢复的机会。这是大多数场景的推荐值
  • 设 10+:基本没意义,如果重试 10 次还失败,说明不是瞬时问题

DLQ 本身也要配 DLQ 吗?不需要。DLQ 的消息应该由运维人员或自动化脚本处理,不应该再有"死信的死信"。但 DLQ 的消息保留期要设长一些(建议 14 天),给你足够的时间排查和修复。

四、Long Polling vs Short Polling — 省钱的关键

这是一个很多人忽略但直接影响成本的配置。

Short Polling(默认):每次 ReceiveMessage 立即返回,即使队列为空也算一次请求。如果你的消费者每秒轮询一次,一天就是 86400 次请求,一个月 259 万次。乘以消费者数量,费用很可观。

Long Polling:设置 WaitTimeSeconds(最大 20 秒),队列为空时请求会挂起等待,有消息到达时立即返回。这样空闲时段的请求次数大幅减少。

# 错误:Short Polling,疯狂烧钱
while True:
    resp = sqs.receive_message(QueueUrl=queue_url)  # 立即返回
    if not resp.get('Messages'):
        time.sleep(1)  # 自己加 sleep 也不如 Long Polling 优雅

# 正确:Long Polling
while True:
    resp = sqs.receive_message(
        QueueUrl=queue_url,
        MaxNumberOfMessages=10,      # 一次最多拉 10 条
        WaitTimeSeconds=20,          # 最多等 20 秒
        MessageAttributeNames=['All'] # 拉取所有自定义属性
    )
    messages = resp.get('Messages', [])
    for msg in messages:
        process_and_delete(msg)

实测数据:一个中等流量的系统(日均 50 万条消息),从 Short Polling 切到 Long Polling 后,SQS 请求费用降低了约 60%。

五、SNS — 不只是"发通知"

很多人对 SNS 的理解停留在"发短信、发邮件",这严重低估了它。SNS 的核心价值是扇出(Fan-out):一条消息发布到 Topic,多个订阅者同时收到。这是事件驱动架构的基石。

5.1 SNS + SQS Fan-out 模式

这是 AWS 上最经典的解耦模式,没有之一:

                    ┌──→ SQS: order-processing    → 订单服务
                    │
用户下单 → SNS Topic ├──→ SQS: inventory-update   → 库存服务
                    │
                    ├──→ SQS: notification-send   → 通知服务
                    │
                    └──→ SQS: analytics-collect   → 数据分析

为什么不直接让订单服务调用其他服务的 API?因为:

  • 解耦:订单服务不需要知道有多少下游消费者,新增消费者只需要订阅 Topic
  • 容错:库存服务挂了不影响通知服务,每个 SQS 队列独立消费
  • 削峰:秒杀场景下,SNS 瞬间扇出到多个 SQS,每个服务按自己的速度消费
  • 可追溯:每个队列都有独立的 DLQ,哪个环节出问题一目了然

5.2 消息过滤 — SNS 的杀手锏

SNS 支持在订阅级别设置过滤策略,只有匹配的消息才会投递到对应的 SQS。这避免了消费者收到大量无关消息再自己过滤的浪费。

import boto3
import json

sns = boto3.client('sns')

# 创建订阅时设置过滤策略
sns.subscribe(
    TopicArn='arn:aws:sns:ap-southeast-1:123456789:order-events',
    Protocol='sqs',
    Endpoint='arn:aws:sqs:ap-southeast-1:123456789:high-value-orders',
    Attributes={
        'FilterPolicy': json.dumps({
            # 只接收金额大于 1000 的订单事件
            'order_amount': [{'numeric': ['>=', 1000]}],
            # 且事件类型是 created 或 paid
            'event_type': ['order_created', 'order_paid'],
            # 且地区是华东或华南
            'region': [{'prefix': 'east-'}, {'prefix': 'south-'}]
        }),
        # 基于消息体过滤(而非消息属性)
        'FilterPolicyScope': 'MessageBody'
    }
)

# 发布消息时带上属性
sns.publish(
    TopicArn='arn:aws:sns:ap-southeast-1:123456789:order-events',
    Message=json.dumps({
        'order_id': 'ORD-20250225-001',
        'amount': 2500,
        'items': ['MacBook Pro', 'AirPods']
    }),
    MessageAttributes={
        'event_type': {'DataType': 'String', 'StringValue': 'order_created'},
        'order_amount': {'DataType': 'Number', 'StringValue': '2500'},
        'region': {'DataType': 'String', 'StringValue': 'east-shanghai'}
    }
)

过滤策略支持的操作符

  • 精确匹配:["order_created"]
  • 前缀匹配:[{"prefix": "east-"}]
  • 数值范围:[{"numeric": [">=", 100, "<=", 1000]}]
  • 存在性检查:[{"exists": true}]
  • 否定匹配:[{"anything-but": "test"}]

合理使用过滤策略,可以减少 70% 以上的无效消息投递,直接省钱。

5.3 SNS 的投递重试策略

SNS 向 SQS 投递失败时会自动重试,但不同协议的重试策略差异很大:

  • SQS/Lambda:立即重试 3 次,几乎不会失败(都在 AWS 内网)
  • HTTP/HTTPS:分 4 个阶段共重试 38 次,持续约 23 分钟。可以自定义重试策略
  • Email/SMS:不重试,失败就丢了
// HTTP 端点的自定义重试策略
{
  "healthyRetryPolicy": {
    "numRetries": 10,
    "numNoDelayRetries": 2,
    "minDelayTarget": 5,
    "maxDelayTarget": 60,
    "numMinDelayRetries": 3,
    "numMaxDelayRetries": 5,
    "backoffFunction": "exponential"
  }
}

六、生产级消费者架构

写一个 while True 循环拉消息谁都会,但生产环境的消费者要考虑的东西多得多。

6.1 批量处理 + 并发控制

import boto3
import concurrent.futures
import signal
import sys

class ProductionConsumer:
    """
    生产级 SQS 消费者
    特性:优雅关闭、批量拉取、并发处理、错误隔离
    """
    def __init__(self, queue_url, handler, workers=5, region='ap-southeast-1'):
        self.sqs = boto3.client('sqs', region_name=region)
        self.queue_url = queue_url
        self.handler = handler
        self.workers = workers
        self.running = True
        
        # 注册信号处理,支持优雅关闭
        signal.signal(signal.SIGTERM, self._shutdown)
        signal.signal(signal.SIGINT, self._shutdown)
    
    def _shutdown(self, signum, frame):
        print(f"收到信号 {signum},准备优雅关闭...")
        self.running = False
    
    def _process_single(self, message):
        """处理单条消息,异常不影响其他消息"""
        try:
            self.handler(message)
            self.sqs.delete_message(
                QueueUrl=self.queue_url,
                ReceiptHandle=message['ReceiptHandle']
            )
            return True
        except Exception as e:
            print(f"处理失败 MessageId={message['MessageId']}: {e}")
            # 不删除消息,让它在 Visibility Timeout 后重新可见
            return False
    
    def run(self):
        print(f"消费者启动,{self.workers} 个工作线程")
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.workers) as executor:
            while self.running:
                # 批量拉取
                resp = self.sqs.receive_message(
                    QueueUrl=self.queue_url,
                    MaxNumberOfMessages=10,
                    WaitTimeSeconds=20,
                    MessageAttributeNames=['All'],
                    AttributeNames=['All']
                )
                messages = resp.get('Messages', [])
                if not messages:
                    continue
                
                # 并发处理
                futures = {
                    executor.submit(self._process_single, msg): msg 
                    for msg in messages
                }
                # 等待本批次全部完成再拉下一批
                concurrent.futures.wait(futures, timeout=300)
        
        print("消费者已优雅关闭")

# 使用
def handle_order(message):
    data = json.loads(message['Body'])
    print(f"处理订单: {data['order_id']}")

consumer = ProductionConsumer(
    queue_url='https://sqs.ap-southeast-1.amazonaws.com/123456789/orders',
    handler=handle_order,
    workers=10
)
consumer.run()

6.2 Lambda 作为消费者的注意事项

SQS 触发 Lambda 是最常见的 Serverless 模式,但有几个坑:

# Lambda SQS 触发器的正确写法
def lambda_handler(event, context):
    """
    关键点:
    1. event['Records'] 是批量的,可能有 1-10 条消息
    2. 部分成功时要用 batchItemFailures 报告失败的消息
    3. 不要在 Lambda 里调用 delete_message,成功的消息会自动删除
    """
    batch_item_failures = []
    
    for record in event['Records']:
        try:
            body = json.loads(record['body'])
            process_order(body)
        except Exception as e:
            print(f"处理失败: {record['messageId']}, 错误: {e}")
            batch_item_failures.append({
                'itemIdentifier': record['messageId']
            })
    
    # 返回失败的消息 ID,只有这些消息会被重试
    # 如果不返回这个,整批消息都会重试(包括已成功的)
    return {
        'batchItemFailures': batch_item_failures
    }

Lambda + SQS 的关键配置

  • BatchSize:一次给 Lambda 多少条消息。CPU 密集型任务设小(1-3),IO 密集型可以设大(10)
  • MaximumBatchingWindowInSeconds:最多等多久凑一批。设 5 秒可以提高批处理效率,但增加延迟
  • FunctionResponseTypes:必须包含 "ReportBatchItemFailures",否则部分失败会导致整批重试
  • 并发控制:SQS 会自动扩展 Lambda 并发数(最多 1000),用 ReservedConcurrency 限制,防止打爆下游

七、延迟队列与定时任务

SQS 支持两种延迟机制,很多人搞混:

  • Queue Delay(DelaySeconds):队列级别,所有消息入队后延迟 N 秒才可见。最大 15 分钟
  • Message Delay(DelaySeconds per message):消息级别,单条消息延迟。Standard Queue 支持,FIFO Queue 不支持消息级别延迟
# 场景:订单 30 分钟未支付自动取消
def create_order(order_id):
    # 1. 创建订单
    save_order(order_id, status='pending')
    
    # 2. 发送延迟消息,30 分钟后检查支付状态
    sqs.send_message(
        QueueUrl=TIMEOUT_CHECK_QUEUE,
        MessageBody=json.dumps({
            'order_id': order_id,
            'action': 'check_payment_timeout',
            'created_at': int(time.time())
        }),
        DelaySeconds=900  # 15 分钟(SQS 最大值)
    )
    # 如果需要 30 分钟,怎么办?见下面的方案

def handle_timeout_check(message):
    data = json.loads(message['Body'])
    order = get_order(data['order_id'])
    
    elapsed = int(time.time()) - data['created_at']
    
    if order['status'] == 'paid':
        return  # 已支付,不需要取消
    
    if elapsed < 1800:  # 还没到 30 分钟
        # 重新入队,继续等待(阶梯延迟)
        remaining = 1800 - elapsed
        delay = min(remaining, 900)  # 最多再延迟 15 分钟
        sqs.send_message(
            QueueUrl=TIMEOUT_CHECK_QUEUE,
            MessageBody=message['Body'],
            DelaySeconds=delay
        )
        return
    
    # 超时,取消订单
    cancel_order(data['order_id'])

超过 15 分钟延迟的解决方案

  1. 阶梯延迟(如上代码):消息到期后检查是否真的超时,没超时就重新入队。简单但有额外的 SQS 请求成本
  2. DynamoDB TTL + Stream:把定时任务写入 DynamoDB 并设置 TTL,过期时触发 Stream → Lambda。精确度在分钟级别
  3. EventBridge Scheduler:AWS 原生定时调度,支持一次性和周期性任务,精确到秒。这是目前最推荐的方案

八、消息体设计 — 被忽视的架构决策

消息体怎么设计,直接影响系统的可维护性和扩展性。

8.1 胖消息 vs 瘦消息

// 胖消息:把所有数据都放在消息里
{
  "event": "order_created",
  "order": {
    "id": "ORD-001",
    "user_id": "U-123",
    "user_name": "张三",
    "user_email": "[email]",
    "items": [...],
    "total_amount": 2500,
    "shipping_address": {...},
    "payment_method": {...}
  }
}

// 瘦消息:只放引用,消费者自己查
{
  "event": "order_created",
  "order_id": "ORD-001",
  "timestamp": 1740000000
}

我的建议:用"适度胖"的消息。

  • 包含消费者处理所需的核心数据,避免消费者再回查数据库(减少延迟和数据库压力)
  • 不包含可能频繁变化的数据(如用户昵称),这些数据在消费时可能已经过期
  • SQS 消息体最大 256KB,如果超了,把数据放 S3,消息里放 S3 的 key

8.2 大消息处理

# 超过 256KB 的消息:SQS Extended Client 模式
import boto3
import json
import uuid

s3 = boto3.client('s3')
sqs = boto3.client('sqs')
BUCKET = 'my-sqs-large-messages'

def send_large_message(queue_url, data):
    body = json.dumps(data)
    
    if len(body.encode('utf-8')) > 200000:  # 留点余量
        # 存 S3
        s3_key = f"sqs-messages/{uuid.uuid4()}.json"
        s3.put_object(Bucket=BUCKET, Key=s3_key, Body=body)
        
        # 消息里放引用
        sqs.send_message(
            QueueUrl=queue_url,
            MessageBody=json.dumps({
                '_large_message': True,
                '_s3_bucket': BUCKET,
                '_s3_key': s3_key
            })
        )
    else:
        sqs.send_message(QueueUrl=queue_url, MessageBody=body)

def receive_large_message(message):
    body = json.loads(message['Body'])
    
    if body.get('_large_message'):
        # 从 S3 读取实际内容
        resp = s3.get_object(Bucket=body['_s3_bucket'], Key=body['_s3_key'])
        return json.loads(resp['Body'].read())
    
    return body

九、监控与告警 — 生产环境的眼睛

消息系统最怕的不是宕机,是消息悄悄积压没人知道。以下是必须配置的监控指标:

9.1 核心监控指标

# Terraform 配置 SQS 关键告警

# 1. 队列积压告警 — 最重要的指标
resource "aws_cloudwatch_metric_alarm" "sqs_backlog" {
  alarm_name          = "sqs-order-queue-backlog"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 3
  metric_name         = "ApproximateNumberOfMessagesVisible"
  namespace           = "AWS/SQS"
  period              = 300
  statistic           = "Maximum"
  threshold           = 1000  # 根据业务调整
  alarm_description   = "订单队列积压超过 1000 条"
  
  dimensions = {
    QueueName = "order-processing"
  }
  alarm_actions = [aws_sns_topic.alerts.arn]
}

# 2. 消息年龄告警 — 消息在队列里待了多久
resource "aws_cloudwatch_metric_alarm" "sqs_message_age" {
  alarm_name          = "sqs-order-queue-age"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 2
  metric_name         = "ApproximateAgeOfOldestMessage"
  namespace           = "AWS/SQS"
  period              = 300
  statistic           = "Maximum"
  threshold           = 3600  # 最老的消息超过 1 小时
  alarm_description   = "订单队列最老消息超过 1 小时未处理"
  
  dimensions = {
    QueueName = "order-processing"
  }
  alarm_actions = [aws_sns_topic.alerts.arn]
}

# 3. DLQ 非空告警 — DLQ 有消息就要告警
resource "aws_cloudwatch_metric_alarm" "dlq_not_empty" {
  alarm_name          = "sqs-order-dlq-not-empty"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 1
  metric_name         = "ApproximateNumberOfMessagesVisible"
  namespace           = "AWS/SQS"
  period              = 60
  statistic           = "Sum"
  threshold           = 0
  alarm_description   = "订单 DLQ 中有失败消息,需要人工介入"
  
  dimensions = {
    QueueName = "order-processing-dlq"
  }
  alarm_actions = [aws_sns_topic.alerts.arn]
}

9.2 自定义业务指标

# 在消费者中埋点,推送自定义指标到 CloudWatch
import boto3
import time

cloudwatch = boto3.client('cloudwatch')

def publish_consumer_metrics(queue_name, processing_time, success):
    cloudwatch.put_metric_data(
        Namespace='Custom/SQS',
        MetricData=[
            {
                'MetricName': 'MessageProcessingTime',
                'Value': processing_time,
                'Unit': 'Milliseconds',
                'Dimensions': [
                    {'Name': 'QueueName', 'Value': queue_name}
                ]
            },
            {
                'MetricName': 'MessageProcessingSuccess' if success 
                              else 'MessageProcessingFailure',
                'Value': 1,
                'Unit': 'Count',
                'Dimensions': [
                    {'Name': 'QueueName', 'Value': queue_name}
                ]
            }
        ]
    )

十、安全最佳实践

10.1 队列访问策略

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "AllowSNSToSendMessage",
      "Effect": "Allow",
      "Principal": {"Service": "sns.amazonaws.com"},
      "Action": "sqs:SendMessage",
      "Resource": "arn:aws:sqs:ap-southeast-1:123456789:order-queue",
      "Condition": {
        "ArnEquals": {
          "aws:SourceArn": "arn:aws:sns:ap-southeast-1:123456789:order-topic"
        }
      }
    },
    {
      "Sid": "DenyNonSSL",
      "Effect": "Deny",
      "Principal": "*",
      "Action": "sqs:*",
      "Resource": "arn:aws:sqs:ap-southeast-1:123456789:order-queue",
      "Condition": {
        "Bool": {"aws:SecureTransport": "false"}
      }
    }
  ]
}

10.2 加密

  • 传输加密:SQS 默认使用 HTTPS,已经加密
  • 静态加密:开启 SSE-SQS(免费)或 SSE-KMS(可以控制密钥权限)
  • 敏感数据:不要在消息体里放密码、密钥等敏感信息。如果必须传递,先用 KMS 加密,消费者端解密

十一、成本优化实战

SQS/SNS 按请求次数计费,优化的核心就是减少请求次数。

优化手段 效果 实现方式
Long Polling 减少 60% 空轮询 WaitTimeSeconds=20
批量发送 减少 90% 发送请求 SendMessageBatch,一次最多 10 条
批量删除 减少 90% 删除请求 DeleteMessageBatch
SNS 消息过滤 减少无效投递 FilterPolicy
合并小消息 减少总请求数 应用层攒批后发送
# 批量操作示例:发送和删除都用 Batch
def batch_send(sqs, queue_url, messages):
    """批量发送,一次最多 10 条"""
    entries = []
    for i, msg in enumerate(messages):
        entries.append({
            'Id': str(i),
            'MessageBody': json.dumps(msg)
        })
        
        if len(entries) == 10:
            resp = sqs.send_message_batch(
                QueueUrl=queue_url, Entries=entries
            )
            _check_failures(resp)
            entries = []
    
    if entries:
        resp = sqs.send_message_batch(
            QueueUrl=queue_url, Entries=entries
        )
        _check_failures(resp)

def batch_delete(sqs, queue_url, messages):
    """批量删除已处理的消息"""
    entries = [
        {'Id': str(i), 'ReceiptHandle': msg['ReceiptHandle']}
        for i, msg in enumerate(messages)
    ]
    if entries:
        resp = sqs.delete_message_batch(
            QueueUrl=queue_url, Entries=entries
        )
        _check_failures(resp)

def _check_failures(resp):
    failed = resp.get('Failed', [])
    if failed:
        print(f"批量操作部分失败: {failed}")

十二、真实故障案例复盘

案例一:消息风暴导致 Lambda 并发打满

现象:上游突然发了 10 万条消息到 SQS,Lambda 并发瞬间飙到 1000(账号默认上限),其他 Lambda 函数全部无法执行。

根因:SQS 触发 Lambda 时会自动扩展并发,没有设置 ReservedConcurrency。

解决

  1. 给 SQS 触发的 Lambda 设置 ReservedConcurrency=100
  2. 给关键业务 Lambda 也设置 ReservedConcurrency,确保不被挤占
  3. 在 SQS 和 Lambda 之间加一层限流逻辑

案例二:FIFO 队列吞吐骤降

现象:FIFO 队列平时每秒处理 200 条消息,某天突然降到每秒 10 条。

根因:有一个消费者处理某条消息时卡住了(下游超时),这条消息的 MessageGroupId 对应的整个分组都被阻塞。而恰好 80% 的消息都用了同一个 GroupId。

解决

  1. 重新设计 MessageGroupId,按业务实体(如 user_id)分组,而不是按消息类型
  2. 给消费者加超时控制和心跳机制
  3. 监控单个 MessageGroup 的积压情况

案例三:DLQ 消息丢失

现象:DLQ 里的消息莫名其妙消失了,但没有人处理过。

根因:DLQ 的消息保留期(MessageRetentionPeriod)用了默认的 4 天,而运维团队的 DLQ 巡检周期是每周一次。消息在被发现之前就过期删除了。

解决:DLQ 的 MessageRetentionPeriod 设为 14 天(最大值),同时配置 DLQ 非空告警,有消息立即通知。

十三、SQS/SNS vs 其他消息服务的选择

维度 SQS/SNS Kinesis MSK (Kafka) EventBridge
运维成本 零运维 低(Serverless 模式) 高(要管集群) 零运维
吞吐 Standard 无限 / FIFO 3000/s 每分片 1MB/s 极高 有限
消息保留 最多 14 天 最多 365 天 无限 不保留
消息回溯 不支持 支持 支持 可归档到 S3
消费模型 拉取,消费即删 拉取,基于位移 拉取,基于 offset 推送
适用场景 任务队列、解耦 实时流处理 大数据管道 事件路由

选型口诀:任务解耦用 SQS,广播通知用 SNS,实时流用 Kinesis,大数据用 Kafka,事件路由用 EventBridge。不要用一个服务解决所有问题,组合使用才是正道。

总结

SQS 和 SNS 看起来是 AWS 最简单的服务之一,但要在生产环境中用好,需要理解它们的分布式本质。核心原则:

  1. 永远假设消息会重复,做好幂等
  2. Visibility Timeout 要配合心跳机制
  3. DLQ 不是配了就完事,要监控、要告警、要有处理流程
  4. Long Polling + 批量操作是省钱的关键
  5. SNS 的消息过滤能力被严重低估,用好了能省大量计算资源
  6. 监控三板斧:队列深度、消息年龄、DLQ 非空告警
  7. 消费者要有优雅关闭、并发控制、错误隔离的能力

这些经验都是从真实故障中总结出来的,希望能帮你少踩一些坑。

葫芦的运维日志

打赏

留言板

留言提交后需管理员审核通过才会显示

© 冰糖葫芦甜(bthlt.com) 2025 王梓打赏联系方式陕ICP备17005322号-1